package com.hcj.springcloud.controller;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hcj.springcloud.service.CompletableFutureService;
import com.netflix.discovery.util.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@RestController
@RequestMapping("/")
@Slf4j
public class TestCompletableFutureController {

    @Value("${server.port}")
    private Integer port;
    @Value("${spring.application.name}")
    private String springApplicationName;

    @Resource
    private Executor taskExecutor;

    @Autowired
    private CompletableFutureService completableFutureService;


    /**
     * 让“主线程”等待“子线程”结束之后才能继续运行下面的代码
     * @param startTime
     * @param completableFutures
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private JSONObject resloveResult(Long startTime, CompletableFuture<?>... completableFutures) throws ExecutionException, InterruptedException {
        // Wait until they are all done
        //join() 的作用：让“主线程”等待“子线程”结束之后才能继续运行下面的代码
        CompletableFuture.allOf(completableFutures).join();
        // Print results, including elapsed time
        Long exc = (System.currentTimeMillis() - startTime) / 1000;
        log.info("Elapsed time: " + exc + " seconds");
        JSONObject jo = new JSONObject();
        int i = 0;
        for (CompletableFuture<?> completableFuture : completableFutures) {
            log.info("--> {}", completableFuture.get());
            jo.put("completableFuture" + (i++), completableFuture.get());
        }

        return jo;
    }

    /**
     * 不让主线程等待
     * @param startTime
     * @param completableFutures
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private JSONObject resloveResultNotJoin(Long startTime, CompletableFuture<?>... completableFutures) throws ExecutionException, InterruptedException {
        // Wait until they are all done
        CompletableFuture.allOf(completableFutures);

        JSONObject jo = new JSONObject();
        int i = 0;
        for (CompletableFuture<?> completableFuture : completableFutures) {
            // completableFuture.get()，带阻塞功能
            log.info("--> {}", completableFuture.get());
            jo.put("completableFuture" + (i++), completableFuture.get());
        }

        // Print results, including elapsed time
        Long exc = (System.currentTimeMillis() - startTime) / 1000;
        log.info("Elapsed time: " + exc + " seconds");

        return jo;
    }

    /**
     * 无返回值的异步方法
     */
    @GetMapping("/test_1")
    public Object test_1(String name) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> user = completableFutureService.runAsync(name);
        CompletableFuture<Void> missjin = completableFutureService.runAsync("missjin");
        CompletableFuture<Void> mojombo = completableFutureService.runAsync("mojombo");
        CompletableFuture<Void> x = completableFutureService.runAsync("x", taskExecutor);

        JSONObject jo = this.resloveResult(startTime, user, missjin, mojombo, x);
        return jo;
    }

    /**
     * 有返回值的异步方法:让异步方法全部异步执行完后，返回给客户端
     */
    @GetMapping("/test_2")
    public Object test_2(String name) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> user = completableFutureService.supplyAsync(name);
        CompletableFuture<String> missjin = completableFutureService.supplyAsync("missjin");
        CompletableFuture<String> mojombo = completableFutureService.supplyAsync("mojombo");
        CompletableFuture<String> x = completableFutureService.supplyAsync("x", taskExecutor);

        JSONObject jo = this.resloveResult(startTime, user, missjin, mojombo, x);
        return jo;
    }

    /**
     * 有返回值的异步方法:让异步方法异步执行
     */
    @GetMapping("/test_22")
    public Object test_22(String name) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> user = completableFutureService.supplyAsync(name);
        CompletableFuture<String> missjin = completableFutureService.supplyAsync("missjin");
        CompletableFuture<String> mojombo = completableFutureService.supplyAsync("mojombo");
        CompletableFuture<String> x = completableFutureService.supplyAsync("x", taskExecutor);

        JSONObject jo = this.resloveResultNotJoin(startTime, user, missjin, mojombo, x);
        return jo;
    }


    /**
     * 有返回值的异步方法:让异步方法异步执行，并处理异步请求的结果
     */
    @GetMapping("/test_3")
    public Object test_3(String name) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<JSONObject> user = completableFutureService.supplyAsync(name).thenApplyAsync(x -> {
            return JSONObject.parseObject(x);
        });
        CompletableFuture<JSONObject> mojombo = completableFutureService.supplyAsync("mojombo").thenApplyAsync(y -> {
            return JSONObject.parseObject(y);
        });
        CompletableFuture<JSONObject> x = completableFutureService.supplyAsync("x", taskExecutor).thenApplyAsync(z -> {
            return JSONObject.parseObject(z);
        });

        JSONObject jo = this.resloveResult(startTime, user, mojombo, x);
        return jo;
    }

    /**
     * 有返回值的异步方法:让异步方法按顺序同步执行
     */
    @GetMapping("/test_4")
    public Object test_4(String name) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<JSONObject> user = completableFutureService.supplyAsync(name).thenApplyAsync(x -> {
            log.info("thenApplyAsync:{}", "开始");
            CompletableFuture<String> mojombo = completableFutureService.supplyAsync("mojombo");
            try {
                String s = mojombo.get();
            } catch (Exception e) {
            }
            return JSONObject.parseObject(x);
        });

        JSONObject jo = this.resloveResult(startTime, user);
        return jo;
    }

    /**
     * 有返回值的异步方法:让异步方法按异步执行
     * thenCombineAsync: 用法是step1完成后，聚合step1的结果和step2的结果
     */
    @GetMapping("/test_5")
    public Object test_5(String name) {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> mojombo = completableFutureService.supplyAsync("mojombo");
        CompletableFuture<JSONArray> jsonArrayCompletableFuture = completableFutureService.supplyAsync(name).thenCombineAsync(mojombo, (nameResult, mojomboResult) -> {
            JSONObject nameJo = JSONObject.parseObject(nameResult);
            JSONObject mojomboJo = JSONObject.parseObject(mojomboResult);
            JSONArray ja = new JSONArray();
            ja.add(nameJo);
            ja.add(mojomboJo);
            long expenseTime = System.currentTimeMillis() - startTime;
            log.info("耗时：{}秒", expenseTime / 1000);
            return ja;
        });
        return jsonArrayCompletableFuture;
    }


    /**
     * 有返回值的异步方法:让异步方法按异步执行
     * + thenCombineAsync: 用法是step1完成后，聚合step1的结果和step2的结果
     * + 异常捕获，和异常处理
     */
    @GetMapping("/test_6")
    public Object test_6(String name) {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> mojombo = completableFutureService.supplyAsync("mojombo");
        CompletableFuture<JSONArray> jsonArrayCompletableFuture = completableFutureService.supplyAsync(name)
                .thenCombineAsync(mojombo, (nameResult, mojomboResult) -> {
                    JSONObject nameJo = JSONObject.parseObject(nameResult);
                    JSONObject mojomboJo = JSONObject.parseObject(mojomboResult);
                    JSONArray ja = new JSONArray();
                    ja.add(nameJo);
                    ja.add(mojomboJo);
                    // 这里会发生 数字格式化异常
                    Integer.valueOf("aa");
                    long expenseTime = System.currentTimeMillis() - startTime;
                    log.info("耗时：{}秒", expenseTime / 1000);
                    return ja;
                })
                // 永远会执行
                .handle((res, ex)->{
                    if(ex != null){
                        log.info("有异常：{}， 请处理", ex.getMessage());
                    }
                    return res;
                });
                // 主动捕获异常
//                .exceptionally(ex->{
//                    log.info("捕捉异常：{}", ex);
//                });
        return jsonArrayCompletableFuture;
    }


    public static void main(String[] args) throws InterruptedException {
        String a = "aa";
        String b = "bb";
        test(a, b);
        //  测试netflix限流
        RateLimiter rateLimiter = new RateLimiter(TimeUnit.SECONDS);
        rateLimiter.acquire(10,1);

        //  测试guava限流：基于token bucket 的令牌桶策略算法，即单位时间内可产生一个令牌
        com.google.common.util.concurrent.RateLimiter rateLimiterGuava = com.google.common.util.concurrent.RateLimiter.create(1000d);
        log.info("{}, {}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
        log.info("{}, {}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
        log.info("{}, {}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
        log.info("{}, {}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
        log.info("{}, {}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
        Thread.sleep(1000L);
        log.info("休眠5秒后，可以拿到令牌{},{}", rateLimiterGuava.tryAcquire(), rateLimiterGuava.acquire());
    }

    public static void test(String... params) {
        log.info("{}", Arrays.asList(params));
        for (String s : params) {
            log.info("{}", s);
        }
        List<Method> listMethod = new ArrayList<>();
        Method[] method = TestCompletableFutureController.class.getDeclaredMethods();
        Stream<Method> stream = Arrays.stream(method);
        stream.forEach(x -> {
            listMethod.add(x);
            for (Parameter p : x.getParameters()) {
                log.info("{}===>{}====>{}", x.getName(), p.getType().getName(), p.getName());
            }
        });



        List<Map<String, Object>> resultMap = new ArrayList<>();
        Map map_1 = new HashMap<>();
        map_1.put("key_1","value_1");
        resultMap.add(map_1);
        Map map_2 = new HashMap<>();
        map_2.put("key_2","value_2");
        resultMap.add(map_2);


        JSONArray ja = new JSONArray();
        resultMap.forEach(x->{
            x.forEach((k,v)->{
                ja.add(v);
            });
        });

        System.out.println(ja);



    }

}
